构建自定义的同步工具
内置的条件队列
条件队列就如同烤面包机上的面包已好的铃声。如果你正在听着它, 当面包烤好后你可以立即注意到, 并且放下手头的事情开始品尝面包, 如果你没有听见它, 你会错过通知消息, 但是回到厨房后还是看到面包的状态, 如果已经烤完, 就取面包, 如果未烤完, 就再次监听铃声。
条件队列中的元素是一个个正在等待相关条件的线程;每一个对象都可以作为一个条件队列,并且 Object
的 wait
,notify
和 notifyAll
构成了内部条件队列的 API。
wait
:wait
是等待的意思,调用wait
会自动释放锁,并请求系统挂起当前线程,从而使其他线程能够获得这个锁 。notify
:发出通知,解除阻塞条件,JVM
会从这个条件队列上等待的多个线程选择一个来唤醒 。notifyAll
:发出通知,解除阻塞条件,JVM
会唤醒所有在这个条件队列上等待的线程。- 条件谓语:线程等待的条件
条件等待中存在一个很重要的三元关系:synchronized
,wait
和一个条件谓语。 条件变量由一个锁保护,检查条件谓语时必须先持有锁,调用wait
和notifyAll
所在方法的对象必须是同一个对象。
用下面这个代码来解析:
1 | public synchronized V take() throws InterruptedException{ |
在这块代码中,isEmpty()
就是take()
方法的条件谓语,A
线程如果判断为空,将会调用wait
,阻塞A
线程,直到其它线程B
操作使isEmpty()
为false
,接着B
调用notifyAll
,释放锁后,唤醒线程A
。
notify
和notifyAll
的区别:大多数情况下,应该优先选择notifyAll
。
原因:假如线程A
在条件队列上等待条件谓语PA
,线程B
在同一个条件队列上等待条件谓语PB
,假如线程C
将PB
变为真,且调用notify
,JVM
将从众多的等待线程选择其中A
来唤醒,但是A
看到PA
仍然为false
,于是继续等待,然而线程B
本可以开始执行,却没有被唤醒。
只有同时满足以下两个条件时,才能用单一的 notify 而不是 notifyAll:
- 所有等待线程的类型都相同:只有一个条件谓词与条件队列相关,并且每个线程在从 wait 返回后将执行相同的操作。
- 单进单出:在条件变量上的每次通知,最多只能唤醒一个线程来执行。
如果有10个线程在条件队列中等待, 调用 notifyAll 会唤醒每一个线程, 让它们去竞争锁, 然后它们中的大多数或者全部又回到休眠状态, 这意味着每一个激活单一线程执行的事件, 都会带来大量的上下文切换, 和大量竞争锁的请求。
Condition 对象
内置的条件队列有一些缺陷,每一个内置锁都只能由一个相关联的条件队列。如果想要编写一个带有多个条件谓语的并发对象,可以使用Lock
和Condition
。
一个Condition
和一个单独的Lock
相关联, 调用Lock.newCondition()
方法, 可以创建一个Condition
。每个Lock
可以有任意数量的Condition
对象。wait
, notify
, notifyAll
在Condition
中都有对应的:await
, signal
, signalAll
, 而且一定要使用后者!
示例:
1 | public class ConditionBoundedBuffer<T> { |
AbstractQueuedSynchronier (AQS)
同步的实现都是基于AbstractQueuedSynchronizer
(AQS
),AQS
是一个用于构建锁和同步器的框架,例如ReentrantLock
,Semaphore
,CountDownLatch
等都是基于AQS
构建的。
AQS
构建的容器中,最基本的就是获取操作和释放操作,对于CountDownLatch
,获取意味着等待并直到闭锁到达结束状态,对于FutureTask
,获取意味着等待直到任务已经完成。
AQS
负责同步容器类中的状态,它管理了一个整数状态信息,可以通过getState
,setState
以及compareAndSetState
来设置和获取。例如ReentrantLock
用它来表示线程已经重复获取该锁的次数,Semaphore
用它来表示剩余的许可数量,FutureTask
用它来表示任务的状态(尚未开始,正在运行,已完成以及以取消)
AQS
获取操作可能是独占的, 就像ReentrantLock
一样, 也可能是非独占的, 就像Semaphore
和CountDownLatch
一样, 这取决于不同的Synchronizer
下面使用AQS
来实现一个闭锁:(事实上,同步容器就是这样做的)
1 | public class OneShotLatch { |
代码中通过AQS
管理闭锁的状态:关闭0, 打开1。await
方法调用AQS
的acqurieSharedInterruptibly
, 后者随后请求OneShotLatch
中的tryAcquireShared
方法必须返回一个值来表明操作能否继续执行。如果闭锁已经事先打开, tryAcquireShared
会返回成功, 并允许线程通过;否则他会返回一个值, 表明获取请求的尝试失败。acquireSharedInterruptibly
方法处理失败的方式, 是把线程植入一个队列中, 该队列中的元素都是等待中的线程, signal
调用releaseShared
, 进而导致tryReleaseShared
被调用。tryReleasseShared
的实现无条件地把闭锁的状态设置为打开, 通过返回值表明Synchronizer
处于完全释放的状态。
java.util.concurrent 同步类中的 AQS
java.util.concurrent
同步类都没有直接扩展AQS
,而是将他们的功能委托给AQS
子类实现。java.util.concurrent
中基于AQS
开发的类有:ReentrantLock
,Semaphore
,ReentrantReadWriteLock
,CountDownLatch
,SynchronousQueue
和FutureTask
等。
原子变量与非阻塞同步机制
非阻塞算法被广泛用于操作系统和JVM中实现线程/进程调度机制,垃圾回收机制以及锁和其它并发数据结构。
比较并交换(CAS)
概念:
- 比较并交换有三个操作数: 内存位置
V
,进行比较的值A
,新值B
。 - 当且仅当
V
的值等于旧值A
时,CAS
才会用新值B
原子化地更新V
的值, 否则它什么都不会做。 - 无论位置
V
的值是否等于A
,CAS都会返回V
的真实值。CAS
的意思是: 我认为V
的值应该是A
, 如果是, 那么将B
值赋值给V
, 若不是, 则不修改, 并告诉我V
的旧值为多少。 CAS
是一项乐观技术: 它抱着成功的希望进行更新, 并且如果另一个线程在上次检查后更新了该变量, 它能够发现错误 。- 当多个线程试图使用
CAS
同时更新相同的变量时, 只有一个线程会更新变量的值,而其他的都会失败。然而,失败的线程不会被挂起, 他们会被告知这次赛跑失利, 但是允许重试。由于一个线程不会竞争CAS
时不会被阻塞,因此它可以决定是否重试。
CAS
需要由硬件辅助实现。
CAS
的典型使用模式: 首先从V
中读取值A
,根据A计算值B
,然后通过CAS
以原子操作将V
的值A
变为B
。
一个模拟的CAS
:
1 | public class SimulateCAS { |
基于CAS
实现的线程安全的计数器:
1 | public class CasCounter { |
非阻塞算法
如果线程在持有锁时由于阻塞IO
,内存页缺失或其他延迟导致推迟执行,那么很可能所有的线程都不能继续执行下去。
- 非阻塞算法:如果在某种算法中,一个线程的失败或者挂起不会导致其他线程也失败或挂起,那么这种算法就被称为非阻塞算法。
- 无锁算法:如果在算法的每一个步骤都存在某个线程能够执行下去,那么这种算法称为无锁算法(
Lock-Free
)。 - 无阻塞,无锁算法:如果算法中仅将
CAS
用于协调线程之间的操作,并且能够正确地实现,那么它是一种无阻塞,无锁算法。
上面基于CAS
实现的线程安全的计数器就是无阻塞算法实现
基于非阻塞的Stack
实现:
1 | public class ConcurrentStack<E> { |
基于非阻塞的链表:
1 | public class LinkedQueue<E> { |